/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.end2end;

import static org.apache.phoenix.util.TestUtil.analyzeTable;
import static org.apache.phoenix.util.TestUtil.getAllSplits;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@Category(ParallelStatsEnabledTest.class)
@RunWith(Parameterized.class)
public abstract class BaseViewIT extends ParallelStatsEnabledIT {

  protected String tableName;
  protected String schemaName;
  protected String fullTableName;
  protected String tableDDLOptions;
  protected String txProvider;

  public BaseViewIT(String txProvider) {
    StringBuilder optionBuilder = new StringBuilder();
    this.txProvider = txProvider;
    if (txProvider != null) {
      optionBuilder.append(" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + txProvider + "'");
    }
    this.schemaName = "S_" + generateUniqueName();
    this.tableDDLOptions = optionBuilder.toString();
    this.tableName = "T_" + generateUniqueName();
    this.fullTableName = SchemaUtil.getTableName(schemaName, tableName);
  }

  @Parameters(name = "transactionProvider={0}")
  public static synchronized Collection<Object[]> data() {
    return Arrays.asList(new Object[][] { { "OMID" }, { null } });
  }

  protected void testUpdatableViewWithIndex(Integer saltBuckets, boolean localIndex)
    throws Exception {
    String viewName = testUpdatableView(saltBuckets);
    Pair<String, Scan> pair = testUpdatableViewIndex(saltBuckets, localIndex, viewName);
    Scan scan = pair.getSecond();
    String tableName = pair.getFirst();
    // Confirm that dropping the view also deletes the rows in the index
    if (saltBuckets == null) {
      try (Connection conn = DriverManager.getConnection(getUrl())) {
        Table htable = conn.unwrap(PhoenixConnection.class).getQueryServices()
          .getTable(Bytes.toBytes(tableName));
        if (ScanUtil.isLocalIndex(scan)) {
          ScanUtil.setLocalIndexAttributes(scan, 0, HConstants.EMPTY_BYTE_ARRAY,
            HConstants.EMPTY_BYTE_ARRAY, scan.getStartRow(), scan.getStopRow());
        }
        ResultScanner scanner = htable.getScanner(scan);
        Result result = scanner.next();
        // Confirm index has rows
        assertTrue(result != null && !result.isEmpty());

        conn.createStatement().execute("DROP VIEW " + viewName);

        // Confirm index has no rows after view is dropped
        scanner = htable.getScanner(scan);
        result = scanner.next();
        assertTrue(result == null || result.isEmpty());
      }
    }
  }

  protected String testUpdatableView(Integer saltBuckets) throws Exception {
    Connection conn = DriverManager.getConnection(getUrl());
    if (saltBuckets != null) {
      if (tableDDLOptions.length() != 0) tableDDLOptions += ",";
      tableDDLOptions += (" SALT_BUCKETS=" + saltBuckets);
    }
    String viewName = "V_" + generateUniqueName();
    String ddl = "CREATE TABLE " + fullTableName
      + " (k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, k3 DECIMAL, s VARCHAR CONSTRAINT pk PRIMARY KEY (k1, k2, k3))"
      + tableDDLOptions;
    conn.createStatement().execute(ddl);
    ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + fullTableName + " WHERE k1 = 1";
    conn.createStatement().execute(ddl);
    for (int i = 0; i < 10; i++) {
      conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES(" + (i % 4) + ","
        + (i + 100) + "," + (i > 5 ? 2 : 1) + ")");
    }
    conn.commit();
    ResultSet rs;

    rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
    assertTrue(rs.next());
    assertEquals(10, rs.getInt(1));
    rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + viewName);
    assertTrue(rs.next());
    assertEquals(3, rs.getInt(1));
    rs = conn.createStatement().executeQuery("SELECT k1, k2, k3 FROM " + viewName);
    assertTrue(rs.next());
    assertEquals(1, rs.getInt(1));
    assertEquals(101, rs.getInt(2));
    assertEquals(1, rs.getInt(3));
    assertTrue(rs.next());
    assertEquals(1, rs.getInt(1));
    assertEquals(105, rs.getInt(2));
    assertEquals(1, rs.getInt(3));
    assertTrue(rs.next());
    assertEquals(1, rs.getInt(1));
    assertEquals(109, rs.getInt(2));
    assertEquals(2, rs.getInt(3));
    assertFalse(rs.next());

    conn.createStatement().execute("UPSERT INTO " + viewName + "(k2,S,k3) VALUES(120,'foo',50.0)");
    conn.createStatement().execute("UPSERT INTO " + viewName + "(k2,S,k3) VALUES(121,'bar',51.0)");
    conn.commit();
    rs = conn.createStatement().executeQuery("SELECT k1, k2 FROM " + viewName + " WHERE k2 >= 120");
    assertTrue(rs.next());
    assertEquals(1, rs.getInt(1));
    assertEquals(120, rs.getInt(2));
    assertTrue(rs.next());
    assertEquals(1, rs.getInt(1));
    assertEquals(121, rs.getInt(2));
    assertFalse(rs.next());
    conn.close();
    return viewName;
  }

  protected Pair<String, Scan> testUpdatableViewIndex(Integer saltBuckets, String viewName)
    throws Exception {
    return testUpdatableViewIndex(saltBuckets, false, viewName);
  }

  protected Pair<String, Scan> testUpdatableViewIndex(Integer saltBuckets, boolean localIndex,
    String viewName) throws Exception {
    ResultSet rs;
    Connection conn = DriverManager.getConnection(getUrl());
    String viewIndexName1 = "I_" + generateUniqueName();
    String viewIndexPhysicalName = MetaDataUtil.getViewIndexPhysicalName(fullTableName);
    if (localIndex) {
      conn.createStatement()
        .execute("CREATE LOCAL INDEX " + viewIndexName1 + " on " + viewName + "(k3)");
    } else {
      conn.createStatement()
        .execute("CREATE INDEX " + viewIndexName1 + " on " + viewName + "(k3) include (s)");
    }
    conn.createStatement().execute("UPSERT INTO " + viewName + "(k2,S,k3) VALUES(120,'foo',50.0)");
    conn.commit();

    analyzeTable(conn, viewName);
    List<KeyRange> splits = getAllSplits(conn, viewIndexName1);
    // More guideposts with salted, since it's already pre-split at salt buckets
    assertEquals(saltBuckets == null ? 6 : 8, splits.size());

    String query = "SELECT k1, k2, k3, s FROM " + viewName + " WHERE k3 = 51.0";
    rs = conn.createStatement().executeQuery(query);
    assertTrue(rs.next());
    assertEquals(1, rs.getInt(1));
    assertEquals(121, rs.getInt(2));
    assertTrue(BigDecimal.valueOf(51.0).compareTo(rs.getBigDecimal(3)) == 0);
    assertEquals("bar", rs.getString(4));
    assertFalse(rs.next());

    ExplainPlan plan = conn.prepareStatement(query).unwrap(PhoenixPreparedStatement.class)
      .optimizeQuery().getExplainPlan();
    ExplainPlanAttributes explainPlanAttributes = plan.getPlanStepsAsAttributes();
    String iteratorTypeAndScanSize;
    String clientSortAlgo;
    String expectedTableName;
    String keyRanges;
    String serverFilterBy;

    if (localIndex) {
      iteratorTypeAndScanSize = "PARALLEL " + (saltBuckets == null ? 1 : saltBuckets) + "-WAY";
      expectedTableName = fullTableName;
      keyRanges = "[1,51]";
      clientSortAlgo = "CLIENT MERGE SORT";
      serverFilterBy = "SERVER FILTER BY FIRST KEY ONLY";
    } else {
      iteratorTypeAndScanSize =
        saltBuckets == null ? "PARALLEL 1-WAY" : "PARALLEL " + saltBuckets + "-WAY";
      expectedTableName = viewIndexPhysicalName;
      keyRanges = saltBuckets == null
        ? " [" + Short.MIN_VALUE + ",51]"
        : " [0," + Short.MIN_VALUE + ",51] - [" + (saltBuckets - 1) + "," + Short.MIN_VALUE
          + ",51]";
      clientSortAlgo = saltBuckets == null ? null : "CLIENT MERGE SORT";
      serverFilterBy = null;
    }
    assertEquals(iteratorTypeAndScanSize, explainPlanAttributes.getIteratorTypeAndScanSize());
    assertEquals("RANGE SCAN ", explainPlanAttributes.getExplainScanType());
    assertEquals(expectedTableName, explainPlanAttributes.getTableName());
    assertEquals(clientSortAlgo, explainPlanAttributes.getClientSortAlgo());
    assertEquals(keyRanges, explainPlanAttributes.getKeyRanges());
    assertEquals(serverFilterBy, explainPlanAttributes.getServerWhereFilter());

    String viewIndexName2 = "I_" + generateUniqueName();
    if (localIndex) {
      conn.createStatement()
        .execute("CREATE LOCAL INDEX " + viewIndexName2 + " on " + viewName + "(s)");
    } else {
      conn.createStatement().execute("CREATE INDEX " + viewIndexName2 + " on " + viewName + "(s)");
    }

    // new index hasn't been analyzed yet
    splits = getAllSplits(conn, viewIndexName2);
    assertEquals(saltBuckets == null ? 1 : 3, splits.size());

    // analyze table should analyze all view data
    analyzeTable(conn, fullTableName);
    splits = getAllSplits(conn, viewIndexName2);
    assertEquals(saltBuckets == null ? 6 : 8, splits.size());

    query = "SELECT k1, k2, s FROM " + viewName + " WHERE s = 'foo'";
    Statement statement = conn.createStatement();
    rs = statement.executeQuery(query);
    Scan scan = statement.unwrap(PhoenixStatement.class).getQueryPlan().getContext().getScan();
    assertTrue(rs.next());
    assertEquals(1, rs.getInt(1));
    assertEquals(120, rs.getInt(2));
    assertEquals("foo", rs.getString(3));
    assertFalse(rs.next());

    plan = conn.prepareStatement(query).unwrap(PhoenixPreparedStatement.class).optimizeQuery()
      .getExplainPlan();
    explainPlanAttributes = plan.getPlanStepsAsAttributes();

    String physicalTableName;
    if (localIndex) {
      physicalTableName = fullTableName;
      iteratorTypeAndScanSize = "PARALLEL " + (saltBuckets == null ? 1 : saltBuckets) + "-WAY";
      keyRanges = " [" + (2) + ",'foo']";
      clientSortAlgo = "CLIENT MERGE SORT";
    } else {
      physicalTableName = viewIndexPhysicalName;
      iteratorTypeAndScanSize =
        saltBuckets == null ? "PARALLEL 1-WAY" : "PARALLEL " + saltBuckets + "-WAY";
      keyRanges = saltBuckets == null
        ? " [" + (Short.MIN_VALUE + 1) + ",'foo']"
        : " [0," + (Short.MIN_VALUE + 1) + ",'foo'] - [" + (saltBuckets - 1) + ","
          + (Short.MIN_VALUE + 1) + ",'foo']";
      clientSortAlgo = saltBuckets == null ? null : "CLIENT MERGE SORT";
    }
    assertEquals(physicalTableName, explainPlanAttributes.getTableName());
    assertEquals(iteratorTypeAndScanSize, explainPlanAttributes.getIteratorTypeAndScanSize());
    assertEquals("RANGE SCAN ", explainPlanAttributes.getExplainScanType());
    assertEquals(keyRanges, explainPlanAttributes.getKeyRanges());
    assertEquals("SERVER FILTER BY FIRST KEY ONLY", explainPlanAttributes.getServerWhereFilter());
    assertEquals(clientSortAlgo, explainPlanAttributes.getClientSortAlgo());

    conn.close();
    return new Pair<>(physicalTableName, scan);
  }
}
